// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plans

import (
	"github.com/juju/errors"
	"github.com/pingcap/tidb/context"
	"github.com/pingcap/tidb/expression"
	"github.com/pingcap/tidb/expression/subquery"
	"github.com/pingcap/tidb/field"
	"github.com/pingcap/tidb/plan"
	"github.com/pingcap/tidb/util/format"
)

// RowStackFromPlan stores origin row from table in current select phase,
// so we can fetch value from outer query for later subquery.
// RowStackFromPlan is used after From phase to get origin row uniformly.
type RowStackFromPlan struct {
	Src plan.Plan
}

// Explain implements the plan.Plan Explain interface.
func (p *RowStackFromPlan) Explain(w format.Formatter) {
	p.Src.Explain(w)
}

// GetFields implements the plan.Plan GetFields interface.
func (p *RowStackFromPlan) GetFields() []*field.ResultField {
	return p.Src.GetFields()
}

// Filter implements the plan.Plan Filter interface.
func (p *RowStackFromPlan) Filter(ctx context.Context, expr expression.Expression) (plan.Plan, bool, error) {
	r, b, err := p.Src.Filter(ctx, expr)
	if !b || err != nil {
		return p, false, errors.Trace(err)
	}

	p.Src = r
	return p, true, nil
}

// Next implements the plan.Plan Next interface.
func (p *RowStackFromPlan) Next(ctx context.Context) (*plan.Row, error) {
	row, err := p.Src.Next(ctx)
	if row == nil || err != nil {
		return nil, errors.Trace(err)
	}

	row.FromData = row.Data

	updateRowStack(ctx, nil, row.Data)

	return row, nil
}

// Close implements the plan.Plan Close interface.
func (p *RowStackFromPlan) Close() error {
	return p.Src.Close()
}

// A dummy type to avoid naming collision in context.
type rowStackKeyType int

// String defines a Stringer function for debugging and pretty printing.
func (k rowStackKeyType) String() string {
	return "row stack"
}

// rowStackKey is used to retrive outer table references for sub query.
const rowStackKey rowStackKeyType = 0

// RowStack saves the current and outer row references.
// For every select, we will push a RowStack to a stack for inner sub query use,
// so the top RowStack is always for current select.
// e.g, select c1 from t1 where c2 = (select c1 from t2 where t2.c1 = t1.c2 limit 1),
// the "select c1 from t1" is the outer query for the sub query in where phase, we will
// first push a RowStack to the stack saving the row data for "select c1 from t1", then
// push the second RowStack to the stack saving the row data for "select c1 from t2".
// We will push a RowStack after the from phase and pop it before the final phase.
// So we can guarantee that there is at least one RowStack certainly.
type RowStack struct {
	items []*rowStackItem
}

type rowStackItem struct {
	// OutDataFields is the output record data with select list.
	OutData       []interface{}
	OutDataFields []*field.ResultField
	// FromData is the first origin record data, generated by From.
	FromData       []interface{}
	FromDataFields []*field.ResultField
}

func getRowStack(ctx context.Context) *RowStack {
	v := ctx.Value(rowStackKey)
	if v == nil {
		return nil
	}
	// must be RowStack
	t := v.(*RowStack)
	return t
}

func pushRowStack(ctx context.Context, outDataFields []*field.ResultField, fromDataFields []*field.ResultField) {
	s := getRowStack(ctx)
	if s == nil {
		s = &RowStack{
			items: make([]*rowStackItem, 0, 1),
		}
	}

	s.items = append(s.items, &rowStackItem{
		OutDataFields:  outDataFields,
		FromDataFields: fromDataFields,
	})

	ctx.SetValue(rowStackKey, s)
}

func updateRowStack(ctx context.Context, outData []interface{}, fromData []interface{}) error {
	s := getRowStack(ctx)
	if s == nil {
		return errors.Errorf("update empty row stack")
	}

	t := s.items[len(s.items)-1]
	t.OutData = outData
	t.FromData = fromData
	return nil
}

func popRowStack(ctx context.Context) error {
	s := getRowStack(ctx)

	if s == nil || len(s.items) == 0 {
		return errors.Errorf("pop empty row stack")
	}

	n := len(s.items) - 1
	s.items[n] = nil
	s.items = s.items[0:n]

	if len(s.items) == 0 {
		ctx.ClearValue(rowStackKey)
		return nil
	}

	ctx.SetValue(rowStackKey, s)
	return nil
}

func getIdentValueFromOuterQuery(ctx context.Context, name string) (interface{}, error) {
	s := getRowStack(ctx)
	if s == nil {
		return nil, errors.Errorf("unknown field %s", name)
	}

	// The top is current RowStack, use its last.
	n := len(s.items) - 2

	var (
		v   interface{}
		err error
	)

	for ; n >= 0; n-- {
		t := s.items[n]

		// first try to get from outer table reference.
		if t.FromData != nil {
			v, err = GetIdentValue(name, t.FromDataFields, t.FromData)
			if err == nil {
				// tell current subquery using outer query
				subquery.SetOuterQueryUsed(ctx)
				return v, nil
			}
		}

		// then try to get from outer select list.
		if t.OutData != nil {
			v, err = GetIdentValue(name, t.OutDataFields, t.OutData)
			if err == nil {
				// tell current subquery using outer query
				subquery.SetOuterQueryUsed(ctx)
				return v, nil
			}
		}
	}

	return nil, errors.Errorf("unknown field %s", name)
}
